#ifndef XRPL_RESOURCE_LOGIC_H_INCLUDED
#define XRPL_RESOURCE_LOGIC_H_INCLUDED

#include <xrpl/basics/Log.h>
#include <xrpl/basics/UnorderedContainers.h>
#include <xrpl/basics/chrono.h>
#include <xrpl/beast/clock/abstract_clock.h>
#include <xrpl/beast/insight/Insight.h>
#include <xrpl/beast/utility/PropertyStream.h>
#include <xrpl/beast/utility/instrumentation.h>
#include <xrpl/json/json_value.h>
#include <xrpl/protocol/jss.h>
#include <xrpl/resource/Fees.h>
#include <xrpl/resource/Gossip.h>
#include <xrpl/resource/detail/Import.h>

#include <mutex>

namespace ripple {
namespace Resource {

class Logic
{
private:
    using clock_type = Stopwatch;
    using Imports = hash_map<std::string, Import>;
    using Table = hash_map<Key, Entry, Key::hasher, Key::key_equal>;
    using EntryIntrusiveList = beast::List<Entry>;

    struct Stats
    {
        Stats(beast::insight::Collector::ptr const& collector)
        {
            warn = collector->make_meter("warn");
            drop = collector->make_meter("drop");
        }

        beast::insight::Meter warn;
        beast::insight::Meter drop;
    };

    Stats m_stats;
    Stopwatch& m_clock;
    beast::Journal m_journal;

    std::recursive_mutex lock_;

    // Table of all entries
    Table table_;

    // Because the following are intrusive lists, a given Entry may be in
    // at most list at a given instant.  The Entry must be removed from
    // one list before placing it in another.

    // List of all active inbound entries
    EntryIntrusiveList inbound_;

    // List of all active outbound entries
    EntryIntrusiveList outbound_;

    // List of all active admin entries
    EntryIntrusiveList admin_;

    // List of all inactve entries
    EntryIntrusiveList inactive_;

    // All imported gossip data
    Imports importTable_;

    //--------------------------------------------------------------------------
public:
    Logic(
        beast::insight::Collector::ptr const& collector,
        clock_type& clock,
        beast::Journal journal)
        : m_stats(collector), m_clock(clock), m_journal(journal)
    {
    }

    ~Logic()
    {
        // These have to be cleared before the Logic is destroyed
        // since their destructors call back into the class.
        // Order matters here as well, the import table has to be
        // destroyed before the consumer table.
        //
        importTable_.clear();
        table_.clear();
    }

    Consumer
    newInboundEndpoint(beast::IP::Endpoint const& address)
    {
        Entry* entry(nullptr);

        {
            std::lock_guard _(lock_);
            auto [resultIt, resultInserted] = table_.emplace(
                std::piecewise_construct,
                std::make_tuple(kindInbound, address.at_port(0)),  // Key
                std::make_tuple(m_clock.now()));                   // Entry

            entry = &resultIt->second;
            entry->key = &resultIt->first;
            ++entry->refcount;
            if (entry->refcount == 1)
            {
                if (!resultInserted)
                {
                    inactive_.erase(inactive_.iterator_to(*entry));
                }
                inbound_.push_back(*entry);
            }
        }

        JLOG(m_journal.debug()) << "New inbound endpoint " << *entry;

        return Consumer(*this, *entry);
    }

    Consumer
    newOutboundEndpoint(beast::IP::Endpoint const& address)
    {
        Entry* entry(nullptr);

        {
            std::lock_guard _(lock_);
            auto [resultIt, resultInserted] = table_.emplace(
                std::piecewise_construct,
                std::make_tuple(kindOutbound, address),  // Key
                std::make_tuple(m_clock.now()));         // Entry

            entry = &resultIt->second;
            entry->key = &resultIt->first;
            ++entry->refcount;
            if (entry->refcount == 1)
            {
                if (!resultInserted)
                    inactive_.erase(inactive_.iterator_to(*entry));
                outbound_.push_back(*entry);
            }
        }

        JLOG(m_journal.debug()) << "New outbound endpoint " << *entry;

        return Consumer(*this, *entry);
    }

    /**
     * Create endpoint that should not have resource limits applied. Other
     * restrictions, such as permission to perform certain RPC calls, may be
     * enabled.
     */
    Consumer
    newUnlimitedEndpoint(beast::IP::Endpoint const& address)
    {
        Entry* entry(nullptr);

        {
            std::lock_guard _(lock_);
            auto [resultIt, resultInserted] = table_.emplace(
                std::piecewise_construct,
                std::make_tuple(kindUnlimited, address.at_port(1)),  // Key
                std::make_tuple(m_clock.now()));                     // Entry

            entry = &resultIt->second;
            entry->key = &resultIt->first;
            ++entry->refcount;
            if (entry->refcount == 1)
            {
                if (!resultInserted)
                    inactive_.erase(inactive_.iterator_to(*entry));
                admin_.push_back(*entry);
            }
        }

        JLOG(m_journal.debug()) << "New unlimited endpoint " << *entry;

        return Consumer(*this, *entry);
    }

    Json::Value
    getJson()
    {
        return getJson(warningThreshold);
    }

    /** Returns a Json::objectValue. */
    Json::Value
    getJson(int threshold)
    {
        clock_type::time_point const now(m_clock.now());

        Json::Value ret(Json::objectValue);
        std::lock_guard _(lock_);

        for (auto& inboundEntry : inbound_)
        {
            int localBalance = inboundEntry.local_balance.value(now);
            if ((localBalance + inboundEntry.remote_balance) >= threshold)
            {
                Json::Value& entry =
                    (ret[inboundEntry.to_string()] = Json::objectValue);
                entry[jss::local] = localBalance;
                entry[jss::remote] = inboundEntry.remote_balance;
                entry[jss::type] = "inbound";
            }
        }
        for (auto& outboundEntry : outbound_)
        {
            int localBalance = outboundEntry.local_balance.value(now);
            if ((localBalance + outboundEntry.remote_balance) >= threshold)
            {
                Json::Value& entry =
                    (ret[outboundEntry.to_string()] = Json::objectValue);
                entry[jss::local] = localBalance;
                entry[jss::remote] = outboundEntry.remote_balance;
                entry[jss::type] = "outbound";
            }
        }
        for (auto& adminEntry : admin_)
        {
            int localBalance = adminEntry.local_balance.value(now);
            if ((localBalance + adminEntry.remote_balance) >= threshold)
            {
                Json::Value& entry =
                    (ret[adminEntry.to_string()] = Json::objectValue);
                entry[jss::local] = localBalance;
                entry[jss::remote] = adminEntry.remote_balance;
                entry[jss::type] = "admin";
            }
        }

        return ret;
    }

    Gossip
    exportConsumers()
    {
        clock_type::time_point const now(m_clock.now());

        Gossip gossip;
        std::lock_guard _(lock_);

        gossip.items.reserve(inbound_.size());

        for (auto& inboundEntry : inbound_)
        {
            Gossip::Item item;
            item.balance = inboundEntry.local_balance.value(now);
            if (item.balance >= minimumGossipBalance)
            {
                item.address = inboundEntry.key->address;
                gossip.items.push_back(item);
            }
        }

        return gossip;
    }

    //--------------------------------------------------------------------------

    void
    importConsumers(std::string const& origin, Gossip const& gossip)
    {
        auto const elapsed = m_clock.now();
        {
            std::lock_guard _(lock_);
            auto [resultIt, resultInserted] = importTable_.emplace(
                std::piecewise_construct,
                std::make_tuple(origin),  // Key
                std::make_tuple(
                    m_clock.now().time_since_epoch().count()));  // Import

            if (resultInserted)
            {
                // This is a new import
                Import& next(resultIt->second);
                next.whenExpires = elapsed + gossipExpirationSeconds;
                next.items.reserve(gossip.items.size());

                for (auto const& gossipItem : gossip.items)
                {
                    Import::Item item;
                    item.balance = gossipItem.balance;
                    item.consumer = newInboundEndpoint(gossipItem.address);
                    item.consumer.entry().remote_balance += item.balance;
                    next.items.push_back(item);
                }
            }
            else
            {
                // Previous import exists so add the new remote
                // balances and then deduct the old remote balances.

                Import next;
                next.whenExpires = elapsed + gossipExpirationSeconds;
                next.items.reserve(gossip.items.size());
                for (auto const& gossipItem : gossip.items)
                {
                    Import::Item item;
                    item.balance = gossipItem.balance;
                    item.consumer = newInboundEndpoint(gossipItem.address);
                    item.consumer.entry().remote_balance += item.balance;
                    next.items.push_back(item);
                }

                Import& prev(resultIt->second);
                for (auto& item : prev.items)
                {
                    item.consumer.entry().remote_balance -= item.balance;
                }

                std::swap(next, prev);
            }
        }
    }

    //--------------------------------------------------------------------------

    // Called periodically to expire entries and groom the table.
    //
    void
    periodicActivity()
    {
        std::lock_guard _(lock_);

        auto const elapsed = m_clock.now();

        for (auto iter(inactive_.begin()); iter != inactive_.end();)
        {
            if (iter->whenExpires <= elapsed)
            {
                JLOG(m_journal.debug()) << "Expired " << *iter;
                auto table_iter = table_.find(*iter->key);
                ++iter;
                erase(table_iter);
            }
            else
            {
                break;
            }
        }

        auto iter = importTable_.begin();
        while (iter != importTable_.end())
        {
            Import& import(iter->second);
            if (iter->second.whenExpires <= elapsed)
            {
                for (auto item_iter(import.items.begin());
                     item_iter != import.items.end();
                     ++item_iter)
                {
                    item_iter->consumer.entry().remote_balance -=
                        item_iter->balance;
                }

                iter = importTable_.erase(iter);
            }
            else
                ++iter;
        }
    }

    //--------------------------------------------------------------------------

    // Returns the disposition based on the balance and thresholds
    static Disposition
    disposition(int balance)
    {
        if (balance >= dropThreshold)
            return Disposition::drop;

        if (balance >= warningThreshold)
            return Disposition::warn;

        return Disposition::ok;
    }

    void
    erase(Table::iterator iter)
    {
        std::lock_guard _(lock_);
        Entry& entry(iter->second);
        XRPL_ASSERT(
            entry.refcount == 0,
            "ripple::Resource::Logic::erase : entry not used");
        inactive_.erase(inactive_.iterator_to(entry));
        table_.erase(iter);
    }

    void
    acquire(Entry& entry)
    {
        std::lock_guard _(lock_);
        ++entry.refcount;
    }

    void
    release(Entry& entry)
    {
        std::lock_guard _(lock_);
        if (--entry.refcount == 0)
        {
            JLOG(m_journal.debug()) << "Inactive " << entry;

            switch (entry.key->kind)
            {
                case kindInbound:
                    inbound_.erase(inbound_.iterator_to(entry));
                    break;
                case kindOutbound:
                    outbound_.erase(outbound_.iterator_to(entry));
                    break;
                case kindUnlimited:
                    admin_.erase(admin_.iterator_to(entry));
                    break;
                default:
                    // LCOV_EXCL_START
                    UNREACHABLE(
                        "ripple::Resource::Logic::release : invalid entry "
                        "kind");
                    break;
                    // LCOV_EXCL_STOP
            }
            inactive_.push_back(entry);
            entry.whenExpires = m_clock.now() + secondsUntilExpiration;
        }
    }

    Disposition
    charge(Entry& entry, Charge const& fee, std::string context = {})
    {
        static constexpr Charge::value_type feeLogAsWarn = 3000;
        static constexpr Charge::value_type feeLogAsInfo = 1000;
        static constexpr Charge::value_type feeLogAsDebug = 100;
        static_assert(
            feeLogAsWarn > feeLogAsInfo && feeLogAsInfo > feeLogAsDebug &&
            feeLogAsDebug > 10);

        static auto getStream = [](Resource::Charge::value_type cost,
                                   beast::Journal& journal) {
            if (cost >= feeLogAsWarn)
                return journal.warn();
            if (cost >= feeLogAsInfo)
                return journal.info();
            if (cost >= feeLogAsDebug)
                return journal.debug();
            return journal.trace();
        };

        if (!context.empty())
            context = " (" + context + ")";

        std::lock_guard _(lock_);
        clock_type::time_point const now(m_clock.now());
        int const balance(entry.add(fee.cost(), now));
        JLOG(getStream(fee.cost(), m_journal))
            << "Charging " << entry << " for " << fee << context;
        return disposition(balance);
    }

    bool
    warn(Entry& entry)
    {
        if (entry.isUnlimited())
            return false;

        std::lock_guard _(lock_);
        bool notify(false);
        auto const elapsed = m_clock.now();
        if (entry.balance(m_clock.now()) >= warningThreshold &&
            elapsed != entry.lastWarningTime)
        {
            charge(entry, feeWarning);
            notify = true;
            entry.lastWarningTime = elapsed;
        }
        if (notify)
        {
            JLOG(m_journal.info()) << "Load warning: " << entry;
            ++m_stats.warn;
        }
        return notify;
    }

    bool
    disconnect(Entry& entry)
    {
        if (entry.isUnlimited())
            return false;

        std::lock_guard _(lock_);
        bool drop(false);
        clock_type::time_point const now(m_clock.now());
        int const balance(entry.balance(now));
        if (balance >= dropThreshold)
        {
            JLOG(m_journal.warn())
                << "Consumer entry " << entry << " dropped with balance "
                << balance << " at or above drop threshold " << dropThreshold;

            // Adding feeDrop at this point keeps the dropped connection
            // from re-connecting for at least a little while after it is
            // dropped.
            charge(entry, feeDrop);
            ++m_stats.drop;
            drop = true;
        }
        return drop;
    }

    int
    balance(Entry& entry)
    {
        std::lock_guard _(lock_);
        return entry.balance(m_clock.now());
    }

    //--------------------------------------------------------------------------

    void
    writeList(
        clock_type::time_point const now,
        beast::PropertyStream::Set& items,
        EntryIntrusiveList& list)
    {
        for (auto& entry : list)
        {
            beast::PropertyStream::Map item(items);
            if (entry.refcount != 0)
                item["count"] = entry.refcount;
            item["name"] = entry.to_string();
            item["balance"] = entry.balance(now);
            if (entry.remote_balance != 0)
                item["remote_balance"] = entry.remote_balance;
        }
    }

    void
    onWrite(beast::PropertyStream::Map& map)
    {
        clock_type::time_point const now(m_clock.now());

        std::lock_guard _(lock_);

        {
            beast::PropertyStream::Set s("inbound", map);
            writeList(now, s, inbound_);
        }

        {
            beast::PropertyStream::Set s("outbound", map);
            writeList(now, s, outbound_);
        }

        {
            beast::PropertyStream::Set s("admin", map);
            writeList(now, s, admin_);
        }

        {
            beast::PropertyStream::Set s("inactive", map);
            writeList(now, s, inactive_);
        }
    }
};

}  // namespace Resource
}  // namespace ripple

#endif
